-
Notifications
You must be signed in to change notification settings - Fork 1.1k
OPC UA Part 4 6.6 Redundancy (server + client) with opt-in distributed high-availability #3918
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
6e08302
adcf8ea
502f948
245b623
f6acbdc
e6166fc
652c85a
4cd265c
deed6f2
4293cd2
e30947d
cf2ea01
4f4d7ed
d265764
e4f8ac9
e8c6dd1
7ccbcd1
4211a19
dbbb7ba
4870a67
9e9ea9b
35be537
ddc47b9
5b1552a
30e7456
6a287f0
eb3b787
c9eabbe
a1f1b94
fafd9eb
9eb177c
acf2fab
00c9f94
a4c3255
b6a6086
ea79636
d31c412
dd8aa8a
7e902d8
b4f8458
63e669f
c6df8e6
f25f2f4
1c0a330
6950c63
893227d
be80368
7244e9f
2271cf3
42455eb
dc5814f
5f05f97
662a1a8
ac5a7e8
3268b86
968c131
e2e597e
f5f2c97
ed7b8f4
66ead1c
f3d91b0
43d02c0
d9fe5dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| # OPC UA RedundantClient (managed client) sample image. | ||
| # | ||
| # The build context must be the repository ROOT so the project references resolve. | ||
| # Build directly: | ||
| # docker build -f Applications/RedundantClient/Dockerfile -t opcua-redundant-client . | ||
| # Or via docker compose (see Applications/RedundantServer/docker-compose.*.yml). | ||
| FROM mcr.microsoft.com/dotnet/sdk AS build | ||
| WORKDIR /src | ||
| ENV DOTNET_EnableWriteXorExecute=0 | ||
| COPY . . | ||
| RUN dotnet publish "Applications/RedundantClient/RedundantClient.csproj" \ | ||
| -c Release -f net10.0 -p:PublishAot=false \ | ||
| -o /app/publish | ||
|
|
||
| FROM mcr.microsoft.com/dotnet/runtime AS final | ||
| WORKDIR /app | ||
| COPY --from=build /app/publish . | ||
| ENTRYPOINT ["dotnet", "RedundantClient.dll"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,351 @@ | ||
| /* ======================================================================== | ||
| * Copyright (c) 2005-2025 The OPC Foundation, Inc. All rights reserved. | ||
| * | ||
| * OPC Foundation MIT License 1.00 | ||
| * | ||
| * Permission is hereby granted, free of charge, to any person | ||
| * obtaining a copy of this software and associated documentation | ||
| * files (the "Software"), to deal in the Software without | ||
| * restriction, including without limitation the rights to use, | ||
| * copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| * copies of the Software, and to permit persons to whom the | ||
| * Software is furnished to do so, subject to the following | ||
| * conditions: | ||
| * | ||
| * The above copyright notice and this permission notice shall be | ||
| * included in all copies or substantial portions of the Software. | ||
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | ||
| * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES | ||
| * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND | ||
| * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT | ||
| * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | ||
| * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | ||
| * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR | ||
| * OTHER DEALINGS IN THE SOFTWARE. | ||
| * | ||
| * The complete license agreement can be found here: | ||
| * http://opcfoundation.org/License/MIT/1.00/ | ||
| * ======================================================================*/ | ||
|
|
||
| #nullable enable | ||
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.CommandLine; | ||
| using System.CommandLine.Invocation; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Microsoft.Extensions.Logging; | ||
| using Opc.Ua; | ||
| using Opc.Ua.Client; | ||
| using Opc.Ua.Client.Redundancy; | ||
| using Opc.Ua.Configuration; | ||
| using Opc.Ua.Redundancy; | ||
|
|
||
| namespace RedundantClient | ||
| { | ||
| /// <summary> | ||
| /// Entry point for the managed client sample. | ||
| /// </summary> | ||
| public static class Program | ||
| { | ||
| /// <summary> | ||
| /// Starts the sample. | ||
| /// </summary> | ||
| public static Task<int> Main(string[] args) | ||
| { | ||
| var serverOption = new Option<string>("--server", "-s") | ||
| { | ||
| Description = "Discovery URL of any server in the (optionally) redundant set.", | ||
| DefaultValueFactory = _ => "opc.tcp://localhost:62543/RedundantServer" | ||
| }; | ||
| var noSecurityOption = new Option<bool>("--nosecurity") | ||
| { | ||
| Description = "Select endpoints with MessageSecurityMode.None." | ||
| }; | ||
| var autoAcceptOption = new Option<bool>("--autoaccept") | ||
| { | ||
| Description = "Automatically accept untrusted server certificates for sample runs." | ||
| }; | ||
| var durationOption = new Option<TimeSpan>("--duration", "-d") | ||
| { | ||
| Description = "How long to monitor before exiting. Use 00:00:00 to run until Ctrl+C.", | ||
| DefaultValueFactory = _ => TimeSpan.FromMinutes(2) | ||
| }; | ||
| var replicasOption = new Option<int>("--replicas") | ||
| { | ||
| Description = "Run an in-process client replica set of this size (leader holds the session).", | ||
| DefaultValueFactory = _ => 1 | ||
| }; | ||
|
|
||
| var rootCommand = new RootCommand( | ||
| "OPC UA managed client sample that transparently handles server redundancy") | ||
| { | ||
| serverOption, | ||
| noSecurityOption, | ||
| autoAcceptOption, | ||
| durationOption, | ||
| replicasOption | ||
| }; | ||
|
|
||
| rootCommand.SetAction(async (parseResult, cancellationToken) => | ||
| { | ||
| await RunAsync( | ||
| parseResult.GetValue(serverOption)!, | ||
| parseResult.GetValue(noSecurityOption), | ||
| parseResult.GetValue(autoAcceptOption), | ||
| parseResult.GetValue(durationOption), | ||
| parseResult.GetValue(replicasOption), | ||
| cancellationToken).ConfigureAwait(false); | ||
| }); | ||
|
|
||
| ParseResult parseResult = rootCommand.Parse(args); | ||
| return parseResult.InvokeAsync(new InvocationConfiguration(), CancellationToken.None); | ||
| } | ||
|
|
||
| private static async Task RunAsync( | ||
| string serverUrl, | ||
| bool noSecurity, | ||
| bool autoAccept, | ||
| TimeSpan duration, | ||
| int replicas, | ||
| CancellationToken ct) | ||
| { | ||
| ITelemetryContext telemetry = DefaultTelemetry.Create(builder => | ||
| { | ||
| builder.SetMinimumLevel(LogLevel.Information); | ||
| }); | ||
| using IDisposable? telemetryDisposable = telemetry as IDisposable; | ||
|
|
||
| var application = new ApplicationInstance(telemetry) | ||
| { | ||
| ApplicationName = kApplicationName, | ||
| ApplicationType = ApplicationType.Client, | ||
| ConfigSectionName = kConfigSectionName, | ||
| CertificatePasswordProvider = new CertificatePasswordProvider([]) | ||
| }; | ||
|
|
||
| await using (application.ConfigureAwait(false)) | ||
| { | ||
| ApplicationConfiguration configuration = await application | ||
| .LoadApplicationConfigurationAsync(silent: false, ct: ct) | ||
| .ConfigureAwait(false); | ||
| if (autoAccept) | ||
| { | ||
| configuration.CertificateManager.AcceptError = (_, _) => true; | ||
| } | ||
|
|
||
| bool haveCertificate = await application | ||
| .CheckApplicationInstanceCertificatesAsync(silent: false, ct: ct) | ||
| .ConfigureAwait(false); | ||
| if (!haveCertificate) | ||
| { | ||
| throw new InvalidOperationException("Application instance certificate invalid."); | ||
| } | ||
|
|
||
| EndpointDescription selectedEndpoint = await CoreClientUtils | ||
| .SelectEndpointAsync(configuration, serverUrl, useSecurity: !noSecurity, telemetry, ct) | ||
| .ConfigureAwait(false) | ||
| ?? throw new InvalidOperationException( | ||
| $"No endpoint could be selected for '{serverUrl}'."); | ||
| var endpoint = new ConfiguredEndpoint( | ||
| null, | ||
| selectedEndpoint, | ||
| EndpointConfiguration.Create(configuration)); | ||
|
|
||
| Console.WriteLine("Connecting managed client to {0}", serverUrl); | ||
|
|
||
| if (replicas > 1) | ||
| { | ||
| await RunReplicaSetAsync( | ||
| configuration, endpoint, telemetry, replicas, duration, ct).ConfigureAwait(false); | ||
| return; | ||
| } | ||
|
|
||
| // A single ManagedSession is the managed client. WithServerRedundancy() lets it | ||
| // discover the redundant set (if any) from the connected server and fail over | ||
| // transparently; against a server that is not configured for redundancy it simply | ||
| // behaves as a resilient reconnecting session. The caller does not need to know the | ||
| // server topology before connecting. | ||
| ManagedSession session = await new ManagedSessionBuilder(configuration, telemetry) | ||
| .UseEndpoint(endpoint) | ||
| .WithSessionName(kApplicationName) | ||
| .WithUserIdentity(new UserIdentity()) | ||
| .WithServerRedundancy() | ||
| .ConnectAsync(ct) | ||
| .ConfigureAwait(false); | ||
|
|
||
| await using (session.ConfigureAwait(false)) | ||
| { | ||
| session.ConnectionStateChanged += OnConnectionStateChanged; | ||
|
|
||
| await LogRedundancyInfoAsync(session, ct).ConfigureAwait(false); | ||
| await SubscribeToCurrentTimeAsync(session, ct).ConfigureAwait(false); | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should also subscribe to a "Replicated" value (from ha node manager in redundantServer sample). |
||
|
|
||
| Console.WriteLine("Monitoring ServerStatus.CurrentTime. Press Ctrl+C to stop."); | ||
| await RunForDurationAsync(duration, ct).ConfigureAwait(false); | ||
|
|
||
| session.ConnectionStateChanged -= OnConnectionStateChanged; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private static async Task LogRedundancyInfoAsync(ManagedSession session, CancellationToken ct) | ||
| { | ||
| var handler = new DefaultServerRedundancyHandler(); | ||
| ServerRedundancyInfo info = await handler | ||
| .FetchRedundancyInfoAsync(session, ct) | ||
| .ConfigureAwait(false); | ||
| if (info.Mode == RedundancySupport.None) | ||
| { | ||
| Console.WriteLine( | ||
| "Server is not configured for redundancy (RedundancySupport=None); " + | ||
| "running as a single resilient session."); | ||
| return; | ||
| } | ||
|
|
||
| Console.WriteLine( | ||
| "Server reports RedundancySupport={0}, ServiceLevel={1} ({2}), CurrentServerId={3}.", | ||
| info.Mode, | ||
| info.ServiceLevel, | ||
| info.ServiceLevelSubrange, | ||
| info.CurrentServerId); | ||
| for (int ii = 0; ii < info.RedundantServers.Count; ii++) | ||
| { | ||
| RedundantServer server = info.RedundantServers[ii]; | ||
| Console.WriteLine( | ||
| "Peer {0}: uri={1}, state={2}, serviceLevel={3}, endpoint={4}", | ||
| ii + 1, | ||
| server.ServerUri, | ||
| server.ServerState, | ||
| server.ServiceLevel, | ||
| server.Endpoint?.EndpointUrl?.ToString() ?? "(unresolved)"); | ||
| } | ||
| } | ||
|
|
||
| private static async Task SubscribeToCurrentTimeAsync(ManagedSession session, CancellationToken ct) | ||
| { | ||
| // Ownership of the subscription transfers to the session via AddSubscription; | ||
| // the session disposes its subscriptions when it is disposed. | ||
| #pragma warning disable CA2000 | ||
| var subscription = new Subscription(session.DefaultSubscription) | ||
| { | ||
| DisplayName = "RedundantClient CurrentTime", | ||
| PublishingEnabled = true, | ||
| PublishingInterval = 1000, | ||
| KeepAliveCount = 10, | ||
| LifetimeCount = 0, | ||
| MinLifetimeInterval = 10_000, | ||
| FastDataChangeCallback = OnDataChange | ||
| }; | ||
| session.AddSubscription(subscription); | ||
| #pragma warning restore CA2000 | ||
| await subscription.CreateAsync(ct).ConfigureAwait(false); | ||
|
|
||
| var currentTime = new MonitoredItem(subscription.DefaultItem) | ||
| { | ||
| StartNodeId = VariableIds.Server_ServerStatus_CurrentTime, | ||
| AttributeId = Attributes.Value, | ||
| DisplayName = "ServerStatus.CurrentTime", | ||
| SamplingInterval = 1000, | ||
| QueueSize = 10, | ||
| DiscardOldest = true | ||
| }; | ||
| subscription.AddItem(currentTime); | ||
| await subscription.ApplyChangesAsync(ct).ConfigureAwait(false); | ||
| } | ||
|
|
||
| private static void OnDataChange( | ||
| Subscription subscription, | ||
| DataChangeNotification notification, | ||
| ArrayOf<string> stringTable) | ||
| { | ||
| for (int ii = 0; ii < notification.MonitoredItems.Count; ii++) | ||
| { | ||
| MonitoredItemNotification item = notification.MonitoredItems[ii]; | ||
| Console.WriteLine( | ||
| "CurrentTime={0:o} Status={1}", | ||
| item.Value.GetValue(DateTime.MinValue), | ||
| item.Value.StatusCode); | ||
| } | ||
| } | ||
|
|
||
| private static void OnConnectionStateChanged(object? sender, ConnectionStateChangedEventArgs e) | ||
| { | ||
| Console.WriteLine("Connection state: {0} -> {1}", e.PreviousState, e.NewState); | ||
| } | ||
|
|
||
| private static async Task RunForDurationAsync(TimeSpan duration, CancellationToken ct) | ||
| { | ||
| try | ||
| { | ||
| await Task.Delay( | ||
| duration <= TimeSpan.Zero ? Timeout.InfiniteTimeSpan : duration, | ||
| ct).ConfigureAwait(false); | ||
| } | ||
| catch (OperationCanceledException) | ||
| { | ||
| // Ctrl+C or the run duration elapsed; exit cleanly. | ||
| } | ||
| } | ||
|
|
||
| private static async Task RunReplicaSetAsync( | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this can be generalized in Opc.Ua.Client or Opc.Ua.Redundancy.Client to be set up using DI (key store, etc.) and that makes for a simpler setup for users that want to use client redundancy? Ideally the user just gets the shape of ISession to worry about and the redundancy is "transparent" (which hides all coordination and sessions that need to be managed). If the "session" was built with > 1 replicas. Whereby if it was built with 0/1 replicas it just defaults to the current implementation and user gets just a ManagedSession (ISession) just like today. Then above would simplify as the setup of the session and subscriptions are the same. |
||
| ApplicationConfiguration configuration, | ||
| ConfiguredEndpoint endpoint, | ||
| ITelemetryContext telemetry, | ||
| int replicas, | ||
| TimeSpan duration, | ||
| CancellationToken ct) | ||
| { | ||
| // A shared store + lease election make exactly one replica the leader that holds the | ||
| // session; followers stand by and take over on leader loss. This runs in-process with an | ||
| // in-memory store; a multi-process deployment uses a CAS-capable shared store (Redis) or | ||
| // Kubernetes Lease election with the same coordinator. | ||
| using var store = new InMemorySharedKeyValueStore(); | ||
| var coordinators = new List<ClientReplicaCoordinator>(); | ||
| try | ||
| { | ||
| for (int i = 0; i < replicas; i++) | ||
| { | ||
| string nodeId = $"replica-{i + 1}"; | ||
| // Ownership of the election transfers to the coordinator, which disposes it. | ||
| #pragma warning disable CA2000 | ||
| var election = new SharedStoreLeaseElection( | ||
| store, "client-replica/leader", nodeId, | ||
| TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(5), TimeProvider.System); | ||
| #pragma warning restore CA2000 | ||
| var options = new ClientReplicaOptions | ||
| { | ||
| NodeId = nodeId, | ||
| Mode = ClientStandbyMode.Cold, | ||
| CreateSessionAsync = token => new ValueTask<ManagedSession>( | ||
| new ManagedSessionBuilder(configuration, telemetry) | ||
| .UseEndpoint(endpoint) | ||
| .WithSessionName(nodeId) | ||
| .WithUserIdentity(new UserIdentity()) | ||
| .ConnectAsync(token)) | ||
| }; | ||
| var coordinator = new ClientReplicaCoordinator( | ||
| options, election, store, NullRecordProtector.Instance, telemetry); | ||
| coordinator.RoleChanged += isLeader => | ||
| Console.WriteLine("{0} is now {1}", nodeId, isLeader ? "LEADER" : "follower"); | ||
| coordinators.Add(coordinator); | ||
| await coordinator.StartAsync(ct).ConfigureAwait(false); | ||
| } | ||
|
|
||
| Console.WriteLine("Client replica set of {0} started; the leader holds the session.", replicas); | ||
| await RunForDurationAsync(duration, ct).ConfigureAwait(false); | ||
| } | ||
| finally | ||
| { | ||
| foreach (ClientReplicaCoordinator coordinator in coordinators) | ||
| { | ||
| await coordinator.DisposeAsync().ConfigureAwait(false); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private const string kApplicationName = "RedundantClient"; | ||
| private const string kConfigSectionName = "RedundantClient"; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove "managed client" wording. Key point: "Create a normal managed session, opt it into Server redundancy handling, thats it"