Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
6e08302
[Server] Add distributed high-availability node-state + session shari…
marcschier Jun 25, 2026
adcf8ea
[Server] Add secure distributed session manager for HA failover fast-…
marcschier Jun 25, 2026
502f948
[Server] Add real-server integration test for distributed session mir…
marcschier Jun 25, 2026
245b623
[Server] HA security hardening: hash session keys, restore audit even…
marcschier Jun 26, 2026
f6acbdc
[Server] HA docs + sample: Kubernetes deployment guide, session shari…
marcschier Jun 26, 2026
e6166fc
[Docs] plans/32: record A+FG done; add validated B1 (client token-reu…
marcschier Jun 26, 2026
652c85a
[Client] Add opt-in token-reuse failover for HA fast reconnect (REQ-U…
marcschier Jun 26, 2026
4cd265c
[Client] Two-server secured token-reuse failover e2e + HA docs (C)
marcschier Jun 26, 2026
deed6f2
Extract Opc.Ua.Server.Distributed assembly; address PR review feedback
marcschier Jun 26, 2026
4293cd2
Add CRDT active/active replication (Opc.Ua.Server.Distributed.Crdt)
marcschier Jun 26, 2026
e30947d
PR feedback: rename fluent API to Replicated*, integrate CRDT into sa…
marcschier Jun 26, 2026
cf2ea01
PR feedback: organize distributed projects into folders; extract Opc.…
marcschier Jun 26, 2026
4f4d7ed
test: raise Opc.Ua.Server.Distributed.Crdt patch coverage above 80%
marcschier Jun 26, 2026
d265764
feat: OPC UA Part 4 6.6 redundancy - full server + client, transparen…
marcschier Jun 27, 2026
e4f8ac9
feat: widen CRDT extension to all TFMs (Crdt 1.0.2) and remediate 6.6…
marcschier Jun 27, 2026
e8c6dd1
refactor: rename Distributed->Redundancy / Kubernetes->K8s, address r…
marcschier Jun 27, 2026
7ccbcd1
test: fix stale ManagedSession reflection constructor signatures
marcschier Jun 27, 2026
4211a19
build: fix RestrictForLegacyTfm no-op shell on netstandard2.x (CS8021)
marcschier Jun 27, 2026
dbbb7ba
test: cover K8s builder registration; exclude integration-only K8s IO…
marcschier Jun 27, 2026
4870a67
build: make RedundantServer sample follow CustomTestTarget (fix net8 …
marcschier Jun 27, 2026
9e9ea9b
build: don't build net10-only McpServer on net8/net9 all-TFM legs
marcschier Jun 27, 2026
35be537
docs: address PR review - remove redundancy migration section, TFM-av…
marcschier Jun 28, 2026
ddc47b9
Merge remote-tracking branch 'origin/master' into nodestatestorage
marcschier Jun 28, 2026
5b1552a
test: widen CRDT convergence deadline 10s->30s to fix CI starvation f…
marcschier Jun 28, 2026
30e7456
fix(crdt): guarantee value convergence under concurrent multi-writer …
marcschier Jun 28, 2026
6a287f0
samples: simplify RedundantClient to a transparent ManagedSession; ad…
marcschier Jun 28, 2026
eb3b787
refactor: relocate redundancy seams to Opc.Ua.Core (Opc.Ua.Redundancy)
marcschier Jun 29, 2026
c9eabbe
feat(client-redundancy): add Opc.Ua.Client.Redundancy coordinator lib…
marcschier Jun 29, 2026
a1f1b94
feat(client-redundancy): fluent builder, unit tests, HA.md how-to
marcschier Jun 29, 2026
fafd9eb
feat(client-redundancy): token-reuse fast-activate on promotion
marcschier Jun 29, 2026
9eb177c
build: add Opc.Ua.Redundancy using to remaining seam consumers (Aot/S…
marcschier Jun 29, 2026
acf2fab
refactor: tidy Opc.Ua.Client/Session into Redundancy/Subscription/Rec…
marcschier Jun 29, 2026
00c9f94
refactor: fold Opc.Ua.Client.Redundancy into Opc.Ua.Client/Session/Re…
marcschier Jun 29, 2026
a4c3255
refactor: merge Server.Redundancy+.Crdt to Opc.Ua.Redundancy.Server; …
marcschier Jun 29, 2026
b6a6086
feat(client): add IManagedSession:ISession entry-point exposing Redun…
marcschier Jun 29, 2026
ea79636
refactor!: remove RedundantManagedClient; ManagedSession is sole entr…
marcschier Jun 29, 2026
d31c412
feat: add Opc.Ua.Redundancy.Client (CRDT gossip client store for repl…
marcschier Jun 29, 2026
dd8aa8a
feat(client): redundancy transparent + on by default in ConnectAsync
marcschier Jun 29, 2026
7e902d8
docs: migration note for redundancy package re-topology + RedundantMa…
marcschier Jun 29, 2026
b4f8458
test: cover CrdtClientKeyValueStore round-trip (client redundancy)
marcschier Jun 29, 2026
63e669f
feat(sample): add --replicas client replica-set demo (ClientReplicaCo…
marcschier Jun 29, 2026
c6df8e6
test: consolidate redundancy test projects to mirror source
marcschier Jun 29, 2026
f25f2f4
refactor: extract shared CRDT into Opc.Ua.Redundancy; add client CRDT DI
marcschier Jun 29, 2026
1c0a330
feat(redundancy): add IRaftConsensus seam + in-process backend (Raft …
marcschier Jun 30, 2026
6950c63
feat(redundancy): add linearizable RaftSharedKeyValueStore + RaftLead…
marcschier Jun 30, 2026
893227d
feat(redundancy): add HybridSharedKeyValueStore + RedundancyConsisten…
marcschier Jun 30, 2026
be80368
feat(redundancy): server DI consistency-mode selection UseRedundancyC…
marcschier Jun 30, 2026
7244e9f
test(redundancy): nonce/lease auto-wire over Raft CAS (Raft phase 5)
marcschier Jun 30, 2026
2271cf3
feat(redundancy): client DI AddRaftClientSharedStore + mode-aware sto…
marcschier Jun 30, 2026
42455eb
feat(redundancy): scaffold RaftCsConsensus adapter for external RaftC…
marcschier Jun 30, 2026
dc5814f
docs(redundancy): document Raft consistency modes (Raft phase 8)
marcschier Jun 30, 2026
5f05f97
feat(redundancy): pull RaftCs 1.1.0 + Crdt 1.1.0 and wire the real Ra…
marcschier Jun 30, 2026
662a1a8
fix(redundancy): harden Raft proposal liveness + address-space watch …
marcschier Jun 30, 2026
ac5a7e8
feat(redundancy): add RaftCsConsensus.CreateCluster multi-node factor…
marcschier Jul 1, 2026
3268b86
feat(redundancy.k8s): add UseKubernetesRaftConsensus multi-pod wiring…
marcschier Jul 1, 2026
968c131
sample(redundancy): demonstrate strong (Raft) consistency in Redundan…
marcschier Jul 1, 2026
e2e597e
docs(redundancy): document CreateCluster + UseKubernetesRaftConsensus…
marcschier Jul 1, 2026
f5f2c97
test(redundancy): cover K8s durable file-WAL Raft path (rw-D)
marcschier Jul 1, 2026
ed7b8f4
Merge origin/master into nodestatestorage
marcschier Jul 1, 2026
66ead1c
Address PR review: extract continuation-point and sent-message handling
marcschier Jul 1, 2026
f3d91b0
docs(plans): consolidate distributed-HA plans 28-32 into a single rem…
marcschier Jul 1, 2026
43d02c0
feat(redundancy): GetEndpoints load direction across a RedundantServe…
marcschier Jul 1, 2026
d9fe5dd
feat(redundancy): load-direction strong routing, e2e test, sample (R1…
marcschier Jul 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
18 changes: 18 additions & 0 deletions Applications/RedundantClient/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# OPC UA RedundantClient (managed client) sample image.
#
# The build context must be the repository ROOT so the project references resolve.
# Build directly:
# docker build -f Applications/RedundantClient/Dockerfile -t opcua-redundant-client .
# Or via docker compose (see Applications/RedundantServer/docker-compose.*.yml).
FROM mcr.microsoft.com/dotnet/sdk AS build
WORKDIR /src
ENV DOTNET_EnableWriteXorExecute=0
COPY . .
RUN dotnet publish "Applications/RedundantClient/RedundantClient.csproj" \
-c Release -f net10.0 -p:PublishAot=false \
-o /app/publish

FROM mcr.microsoft.com/dotnet/runtime AS final
WORKDIR /app
COPY --from=build /app/publish .
ENTRYPOINT ["dotnet", "RedundantClient.dll"]
351 changes: 351 additions & 0 deletions Applications/RedundantClient/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,351 @@
/* ========================================================================
* Copyright (c) 2005-2025 The OPC Foundation, Inc. All rights reserved.
*
* OPC Foundation MIT License 1.00
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*
* The complete license agreement can be found here:
* http://opcfoundation.org/License/MIT/1.00/
* ======================================================================*/

#nullable enable

using System;
using System.Collections.Generic;
using System.CommandLine;
using System.CommandLine.Invocation;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Opc.Ua;
using Opc.Ua.Client;
using Opc.Ua.Client.Redundancy;
using Opc.Ua.Configuration;
using Opc.Ua.Redundancy;

namespace RedundantClient
{
/// <summary>
/// Entry point for the managed client sample.
/// </summary>
public static class Program
{
/// <summary>
/// Starts the sample.
/// </summary>
public static Task<int> Main(string[] args)
{
var serverOption = new Option<string>("--server", "-s")
{
Description = "Discovery URL of any server in the (optionally) redundant set.",
DefaultValueFactory = _ => "opc.tcp://localhost:62543/RedundantServer"
};
var noSecurityOption = new Option<bool>("--nosecurity")
{
Description = "Select endpoints with MessageSecurityMode.None."
};
var autoAcceptOption = new Option<bool>("--autoaccept")
{
Description = "Automatically accept untrusted server certificates for sample runs."
};
var durationOption = new Option<TimeSpan>("--duration", "-d")
{
Description = "How long to monitor before exiting. Use 00:00:00 to run until Ctrl+C.",
DefaultValueFactory = _ => TimeSpan.FromMinutes(2)
};
var replicasOption = new Option<int>("--replicas")
{
Description = "Run an in-process client replica set of this size (leader holds the session).",
DefaultValueFactory = _ => 1
};

var rootCommand = new RootCommand(
"OPC UA managed client sample that transparently handles server redundancy")
{
serverOption,
noSecurityOption,
autoAcceptOption,
durationOption,
replicasOption
};

rootCommand.SetAction(async (parseResult, cancellationToken) =>
{
await RunAsync(
parseResult.GetValue(serverOption)!,
parseResult.GetValue(noSecurityOption),
parseResult.GetValue(autoAcceptOption),
parseResult.GetValue(durationOption),
parseResult.GetValue(replicasOption),
cancellationToken).ConfigureAwait(false);
});

ParseResult parseResult = rootCommand.Parse(args);
return parseResult.InvokeAsync(new InvocationConfiguration(), CancellationToken.None);
}

private static async Task RunAsync(
string serverUrl,
bool noSecurity,
bool autoAccept,
TimeSpan duration,
int replicas,
CancellationToken ct)
{
ITelemetryContext telemetry = DefaultTelemetry.Create(builder =>
{
builder.SetMinimumLevel(LogLevel.Information);
});
using IDisposable? telemetryDisposable = telemetry as IDisposable;

var application = new ApplicationInstance(telemetry)
{
ApplicationName = kApplicationName,
ApplicationType = ApplicationType.Client,
ConfigSectionName = kConfigSectionName,
CertificatePasswordProvider = new CertificatePasswordProvider([])
};

await using (application.ConfigureAwait(false))
{
ApplicationConfiguration configuration = await application
.LoadApplicationConfigurationAsync(silent: false, ct: ct)
.ConfigureAwait(false);
if (autoAccept)
{
configuration.CertificateManager.AcceptError = (_, _) => true;
}

bool haveCertificate = await application
.CheckApplicationInstanceCertificatesAsync(silent: false, ct: ct)
.ConfigureAwait(false);
if (!haveCertificate)
{
throw new InvalidOperationException("Application instance certificate invalid.");
}

EndpointDescription selectedEndpoint = await CoreClientUtils
.SelectEndpointAsync(configuration, serverUrl, useSecurity: !noSecurity, telemetry, ct)
.ConfigureAwait(false)
?? throw new InvalidOperationException(
$"No endpoint could be selected for '{serverUrl}'.");
var endpoint = new ConfiguredEndpoint(
null,
selectedEndpoint,
EndpointConfiguration.Create(configuration));

Console.WriteLine("Connecting managed client to {0}", serverUrl);

if (replicas > 1)
{
await RunReplicaSetAsync(
configuration, endpoint, telemetry, replicas, duration, ct).ConfigureAwait(false);
return;
}

// A single ManagedSession is the managed client. WithServerRedundancy() lets it

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove "managed client" wording. Key point: "Create a normal managed session, opt it into Server redundancy handling, thats it"

// discover the redundant set (if any) from the connected server and fail over
// transparently; against a server that is not configured for redundancy it simply
// behaves as a resilient reconnecting session. The caller does not need to know the
// server topology before connecting.
ManagedSession session = await new ManagedSessionBuilder(configuration, telemetry)
.UseEndpoint(endpoint)
.WithSessionName(kApplicationName)
.WithUserIdentity(new UserIdentity())
.WithServerRedundancy()
.ConnectAsync(ct)
.ConfigureAwait(false);

await using (session.ConfigureAwait(false))
{
session.ConnectionStateChanged += OnConnectionStateChanged;

await LogRedundancyInfoAsync(session, ct).ConfigureAwait(false);
await SubscribeToCurrentTimeAsync(session, ct).ConfigureAwait(false);

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also subscribe to a "Replicated" value (from ha node manager in redundantServer sample).


Console.WriteLine("Monitoring ServerStatus.CurrentTime. Press Ctrl+C to stop.");
await RunForDurationAsync(duration, ct).ConfigureAwait(false);

session.ConnectionStateChanged -= OnConnectionStateChanged;
}
}
}

private static async Task LogRedundancyInfoAsync(ManagedSession session, CancellationToken ct)
{
var handler = new DefaultServerRedundancyHandler();
ServerRedundancyInfo info = await handler
.FetchRedundancyInfoAsync(session, ct)
.ConfigureAwait(false);
if (info.Mode == RedundancySupport.None)
{
Console.WriteLine(
"Server is not configured for redundancy (RedundancySupport=None); " +
"running as a single resilient session.");
return;
}

Console.WriteLine(
"Server reports RedundancySupport={0}, ServiceLevel={1} ({2}), CurrentServerId={3}.",
info.Mode,
info.ServiceLevel,
info.ServiceLevelSubrange,
info.CurrentServerId);
for (int ii = 0; ii < info.RedundantServers.Count; ii++)
{
RedundantServer server = info.RedundantServers[ii];
Console.WriteLine(
"Peer {0}: uri={1}, state={2}, serviceLevel={3}, endpoint={4}",
ii + 1,
server.ServerUri,
server.ServerState,
server.ServiceLevel,
server.Endpoint?.EndpointUrl?.ToString() ?? "(unresolved)");
}
}

private static async Task SubscribeToCurrentTimeAsync(ManagedSession session, CancellationToken ct)
{
// Ownership of the subscription transfers to the session via AddSubscription;
// the session disposes its subscriptions when it is disposed.
#pragma warning disable CA2000
var subscription = new Subscription(session.DefaultSubscription)
{
DisplayName = "RedundantClient CurrentTime",
PublishingEnabled = true,
PublishingInterval = 1000,
KeepAliveCount = 10,
LifetimeCount = 0,
MinLifetimeInterval = 10_000,
FastDataChangeCallback = OnDataChange
};
session.AddSubscription(subscription);
#pragma warning restore CA2000
await subscription.CreateAsync(ct).ConfigureAwait(false);

var currentTime = new MonitoredItem(subscription.DefaultItem)
{
StartNodeId = VariableIds.Server_ServerStatus_CurrentTime,
AttributeId = Attributes.Value,
DisplayName = "ServerStatus.CurrentTime",
SamplingInterval = 1000,
QueueSize = 10,
DiscardOldest = true
};
subscription.AddItem(currentTime);
await subscription.ApplyChangesAsync(ct).ConfigureAwait(false);
}

private static void OnDataChange(
Subscription subscription,
DataChangeNotification notification,
ArrayOf<string> stringTable)
{
for (int ii = 0; ii < notification.MonitoredItems.Count; ii++)
{
MonitoredItemNotification item = notification.MonitoredItems[ii];
Console.WriteLine(
"CurrentTime={0:o} Status={1}",
item.Value.GetValue(DateTime.MinValue),
item.Value.StatusCode);
}
}

private static void OnConnectionStateChanged(object? sender, ConnectionStateChangedEventArgs e)
{
Console.WriteLine("Connection state: {0} -> {1}", e.PreviousState, e.NewState);
}

private static async Task RunForDurationAsync(TimeSpan duration, CancellationToken ct)
{
try
{
await Task.Delay(
duration <= TimeSpan.Zero ? Timeout.InfiniteTimeSpan : duration,
ct).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Ctrl+C or the run duration elapsed; exit cleanly.
}
}

private static async Task RunReplicaSetAsync(

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this can be generalized in Opc.Ua.Client or Opc.Ua.Redundancy.Client to be set up using DI (key store, etc.) and that makes for a simpler setup for users that want to use client redundancy? Ideally the user just gets the shape of ISession to worry about and the redundancy is "transparent" (which hides all coordination and sessions that need to be managed). If the "session" was built with > 1 replicas. Whereby if it was built with 0/1 replicas it just defaults to the current implementation and user gets just a ManagedSession (ISession) just like today. Then above would simplify as the setup of the session and subscriptions are the same.

ApplicationConfiguration configuration,
ConfiguredEndpoint endpoint,
ITelemetryContext telemetry,
int replicas,
TimeSpan duration,
CancellationToken ct)
{
// A shared store + lease election make exactly one replica the leader that holds the
// session; followers stand by and take over on leader loss. This runs in-process with an
// in-memory store; a multi-process deployment uses a CAS-capable shared store (Redis) or
// Kubernetes Lease election with the same coordinator.
using var store = new InMemorySharedKeyValueStore();
var coordinators = new List<ClientReplicaCoordinator>();
try
{
for (int i = 0; i < replicas; i++)
{
string nodeId = $"replica-{i + 1}";
// Ownership of the election transfers to the coordinator, which disposes it.
#pragma warning disable CA2000
var election = new SharedStoreLeaseElection(
store, "client-replica/leader", nodeId,
TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(5), TimeProvider.System);
#pragma warning restore CA2000
var options = new ClientReplicaOptions
{
NodeId = nodeId,
Mode = ClientStandbyMode.Cold,
CreateSessionAsync = token => new ValueTask<ManagedSession>(
new ManagedSessionBuilder(configuration, telemetry)
.UseEndpoint(endpoint)
.WithSessionName(nodeId)
.WithUserIdentity(new UserIdentity())
.ConnectAsync(token))
};
var coordinator = new ClientReplicaCoordinator(
options, election, store, NullRecordProtector.Instance, telemetry);
coordinator.RoleChanged += isLeader =>
Console.WriteLine("{0} is now {1}", nodeId, isLeader ? "LEADER" : "follower");
coordinators.Add(coordinator);
await coordinator.StartAsync(ct).ConfigureAwait(false);
}

Console.WriteLine("Client replica set of {0} started; the leader holds the session.", replicas);
await RunForDurationAsync(duration, ct).ConfigureAwait(false);
}
finally
{
foreach (ClientReplicaCoordinator coordinator in coordinators)
{
await coordinator.DisposeAsync().ConfigureAwait(false);
}
}
}

private const string kApplicationName = "RedundantClient";
private const string kConfigSectionName = "RedundantClient";
}
}
Loading