Skip to content
60 changes: 60 additions & 0 deletions src/DynamicData/List/ObservableListEx.Adapt.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Collections.ObjectModel;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
using System.Linq.Expressions;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Binding;
using DynamicData.Cache.Internal;
using DynamicData.List.Internal;
using DynamicData.List.Linq;

// ReSharper disable once CheckNamespace
namespace DynamicData;

/// <summary>
/// Extensions for ObservableList.
/// </summary>
public static partial class ObservableListEx
{
/// <summary>
/// Injects a side effect into a changeset stream via an <see cref="IChangeSetAdaptor{T}"/>.
/// The adaptor's <c>Adapt</c> method is invoked for each changeset before it is forwarded downstream unchanged.
/// </summary>
/// <typeparam name="T">The type of items in the list.</typeparam>
/// <param name="source">The source <see cref="IObservable{IChangeSet{T}}"/> to observe and adapt.</param>

Check warning on line 31 in src/DynamicData/List/ObservableListEx.Adapt.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has syntactically incorrect cref attribute 'IObservable{IChangeSet{T}}'
/// <param name="adaptor">The <see cref="IChangeSetAdaptor{T}"/> adaptor whose <c>Adapt</c> method is invoked for each changeset.</param>
/// <returns>A list changeset stream identical to the source, with the adaptor side effect applied.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="adaptor"/> is <see langword="null"/>.</exception>
/// <remarks>
/// <para>
/// This is the primary extension point for custom UI binding adaptors (e.g., <see cref="Bind{T}(IObservable{IChangeSet{T}}, IObservableCollection{T}, BindingOptions)"/>
/// delegates to this operator). If the adaptor throws, the exception propagates downstream as <c>OnError</c>.
/// </para>
/// </remarks>
/// <seealso cref="Bind{T}(IObservable{IChangeSet{T}}, IObservableCollection{T}, BindingOptions)"/>
public static IObservable<IChangeSet<T>> Adapt<T>(this IObservable<IChangeSet<T>> source, IChangeSetAdaptor<T> adaptor)
where T : notnull
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));
adaptor.ThrowArgumentNullExceptionIfNull(nameof(adaptor));

return Observable.Create<IChangeSet<T>>(
observer =>
{
var locker = InternalEx.NewLock();
return source.Synchronize(locker).Select(
changes =>
{
adaptor.Adapt(changes);
return changes;
}).SubscribeSafe(observer);
});
}
}
51 changes: 51 additions & 0 deletions src/DynamicData/List/ObservableListEx.AddKey.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Collections.ObjectModel;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
using System.Linq.Expressions;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Binding;
using DynamicData.Cache.Internal;
using DynamicData.List.Internal;
using DynamicData.List.Linq;

// ReSharper disable once CheckNamespace
namespace DynamicData;

/// <summary>
/// Extensions for ObservableList.
/// </summary>
public static partial class ObservableListEx
{
/// <summary>
/// Adds a key to each item in a list changeset, converting it to a cache changeset that supports all keyed DynamicData operators.
/// </summary>
/// <typeparam name="TObject">The type of items in the list.</typeparam>
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <param name="source">The source <see cref="IObservable{IChangeSet{TObject}}"/> to add keys to, converting to a cache changeset.</param>
/// <param name="keySelector">A <see cref="Func{T, TResult}"/> function to extract a unique key from each item.</param>
/// <returns>A cache <see cref="IObservable{IChangeSet{TObject, TKey}}"/> changeset stream with keyed items.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is <see langword="null"/>.</exception>
/// <remarks>
/// <para>
/// All index information is dropped during conversion because cache changesets are unordered by default.
/// Use this when you need to transition from list-based pipelines to cache-based operators (Filter by key, Join, Group, etc.).
/// </para>
/// </remarks>
/// <seealso cref="ObservableCacheEx.RemoveKey{TObject, TKey}(IObservable{IChangeSet{TObject, TKey}})"/>
public static IObservable<IChangeSet<TObject, TKey>> AddKey<TObject, TKey>(this IObservable<IChangeSet<TObject>> source, Func<TObject, TKey> keySelector)
where TObject : notnull
where TKey : notnull
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));
keySelector.ThrowArgumentNullExceptionIfNull(nameof(keySelector));

return source.Select(changes => new ChangeSet<TObject, TKey>(new AddKeyEnumerator<TObject, TKey>(changes, keySelector)));
}
}
97 changes: 97 additions & 0 deletions src/DynamicData/List/ObservableListEx.And.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Collections.ObjectModel;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
using System.Linq.Expressions;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Binding;
using DynamicData.Cache.Internal;
using DynamicData.List.Internal;
using DynamicData.List.Linq;

// ReSharper disable once CheckNamespace
namespace DynamicData;

/// <summary>
/// Extensions for ObservableList.
/// </summary>
public static partial class ObservableListEx
{
/// <summary>
/// Applies a logical AND (intersection) between multiple list changeset streams.
/// Only items present in ALL sources appear in the result.
/// </summary>
/// <typeparam name="T">The type of items in the lists.</typeparam>
/// <param name="source">The first source <see cref="IObservable{IChangeSet{T}}"/> to intersect.</param>
/// <param name="others">The additional <see cref="IObservable{IChangeSet{T}}"/> changeset streams to intersect with.</param>
/// <returns>A list changeset stream containing items that exist in every source.</returns>
/// <exception cref="ArgumentNullException"><paramref name="others"/> is <see langword="null"/>.</exception>
/// <remarks>
/// <para>
/// Uses reference counting per item across all sources. An item appears downstream only when
/// its reference count is non-zero in ALL sources. Item identity is determined by the default equality comparer.
/// </para>
/// <list type="table">
/// <listheader><term>Event</term><description>Behavior</description></listheader>
/// <item><term>Add/AddRange</term><description>The item's reference count is incremented in its source tracker. If the item is now present in all sources, an <b>Add</b> is emitted.</description></item>
/// <item><term>Replace</term><description>The old item's reference count is decremented and the new item's is incremented. Depending on whether each is present in ALL sources, this emits an <b>Add</b>, <b>Remove</b>, <b>Replace</b>, or nothing.</description></item>
/// <item><term>Remove/RemoveRange/Clear</term><description>The item's reference count is decremented. If it was in the result and is no longer in all sources, a <b>Remove</b> is emitted.</description></item>
/// <item><term>Refresh</term><description>Forwarded as <b>Refresh</b> if the item is currently in the result.</description></item>
/// <item><term>Moved</term><description>Ignored (set operations are position-independent).</description></item>
/// </list>
/// <para><b>Worth noting:</b> Item identity uses object equality, not position. Duplicate items in a single source are reference-counted independently.</para>
/// </remarks>
/// <seealso cref="Or{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <seealso cref="Except{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <seealso cref="Xor{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <seealso cref="ObservableCacheEx.And{TObject, TKey}(IObservable{IChangeSet{TObject, TKey}}, IObservable{IChangeSet{TObject, TKey}}[])"/>
public static IObservable<IChangeSet<T>> And<T>(this IObservable<IChangeSet<T>> source, params IObservable<IChangeSet<T>>[] others)
where T : notnull
{
others.ThrowArgumentNullExceptionIfNull(nameof(others));

return source.Combine(CombineOperator.And, others);
}

/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <param name="sources">A <see cref="ICollection{T}"/> of changeset streams to intersect.</param>
/// <remarks>
/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <para>This overload accepts a pre-built collection of sources instead of a params array.</para>
/// </remarks>
public static IObservable<IChangeSet<T>> And<T>(this ICollection<IObservable<IChangeSet<T>>> sources)
where T : notnull => sources.Combine(CombineOperator.And);

/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <param name="sources">An <see cref="IObservableList{T}"/> of changeset streams. Sources can be added or removed dynamically.</param>
/// <remarks>
/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <para>This overload supports dynamic source management: adding or removing changeset streams from the observable list triggers re-evaluation.</para>
/// </remarks>
public static IObservable<IChangeSet<T>> And<T>(this IObservableList<IObservable<IChangeSet<T>>> sources)
where T : notnull => sources.Combine(CombineOperator.And);

/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <param name="sources">An <see cref="IObservableList{IObservableList{T}}"/> of <see cref="IObservableList{IObservableList{T}}"/>. Each inner list's changes are connected automatically.</param>
/// <remarks>
/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <para>This overload accepts <see cref="IObservableList{T}"/> instances directly, calling <c>Connect()</c> internally.</para>
/// </remarks>
public static IObservable<IChangeSet<T>> And<T>(this IObservableList<IObservableList<T>> sources)
where T : notnull => sources.Combine(CombineOperator.And);

/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <param name="sources">An <see cref="IObservableList{ISourceList{T}}"/> of <see cref="ISourceList{T}"/>. Each inner list's changes are connected automatically.</param>
/// <remarks>
/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <para>This overload accepts <see cref="ISourceList{T}"/> instances directly, calling <c>Connect()</c> internally.</para>
/// </remarks>
public static IObservable<IChangeSet<T>> And<T>(this IObservableList<ISourceList<T>> sources)
where T : notnull => sources.Combine(CombineOperator.And);
}
64 changes: 64 additions & 0 deletions src/DynamicData/List/ObservableListEx.AsObservableList.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Collections.ObjectModel;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
using System.Linq.Expressions;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Binding;
using DynamicData.Cache.Internal;
using DynamicData.List.Internal;
using DynamicData.List.Linq;

// ReSharper disable once CheckNamespace
namespace DynamicData;

/// <summary>
/// Extensions for ObservableList.
/// </summary>
public static partial class ObservableListEx
{
/// <summary>
/// Wraps a <see cref="ISourceList{T}"/> as a read-only <see cref="IObservableList{T}"/>, hiding mutation methods.
/// </summary>
/// <typeparam name="T">The type of items in the list.</typeparam>
/// <param name="source">The <see cref="ISourceList{T}"/> mutable source list to wrap.</param>
/// <returns>A read-only observable list that mirrors the source.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is <see langword="null"/>.</exception>
public static IObservableList<T> AsObservableList<T>(this ISourceList<T> source)
where T : notnull
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));

return new AnonymousObservableList<T>(source);
}

/// <summary>
/// Materializes a changeset stream into a read-only <see cref="IObservableList{T}"/>.
/// The list is kept in sync with the source stream for the lifetime of the subscription.
/// </summary>
/// <typeparam name="T">The type of items in the list.</typeparam>
/// <param name="source">The source <see cref="IObservable{IChangeSet{T}}"/> to materialize into a read-only list.</param>
/// <returns>A read-only observable list reflecting the current state of the stream.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is <see langword="null"/>.</exception>
/// <remarks>
/// <para>
/// This is the primary way to <b>multicast</b> a changeset pipeline. Materializing once into an <see cref="IObservableList{T}"/>,
/// then calling <c>Connect()</c> on the result for each downstream consumer, ensures the upstream operators are evaluated only once
/// regardless of how many subscribers consume the result.
/// </para>
/// </remarks>
/// <seealso cref="AsObservableList{T}(ISourceList{T})"/>
public static IObservableList<T> AsObservableList<T>(this IObservable<IChangeSet<T>> source)
where T : notnull
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));

return new AnonymousObservableList<T>(source);
}
}
Loading
Loading