Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,12 @@
mlsService: mlsService
)

private lazy var teamMemberDiscoveryAgent = TeamMemberDiscoveryAgent(
api: teamsAPI,
store: teamLocalStore,
journal: journal
)

public private(set) lazy var incrementalSync = IncrementalSync(
selfClientID: selfClientID,
pushChannelAPI: pushChannelAPI,
Expand All @@ -411,7 +417,8 @@
liveBrokenGroupSubject: liveBrokenGroupSubject,
journal: journal,
mlsGroupRepairAgent: mlsGroupRepairAgent,
earService: earService
earService: earService,
teamMemberDiscoveryAgent: teamMemberDiscoveryAgent
)

public lazy var incrementalSyncV2: IncrementalSyncV2 = if let sharedContainerURL {
Expand All @@ -430,6 +437,7 @@
journal: journal,
mlsGroupRepairAgent: mlsGroupRepairAgent,
earService: earService,
teamMemberDiscoveryAgent: teamMemberDiscoveryAgent,
createPushChannelState: { [selfClientID] in
PushChannelState(sharedContainerURL: sharedContainerURL, clientID: selfClientID)
}
Expand Down Expand Up @@ -844,7 +852,7 @@
)

public lazy var workAgent: WorkAgent = .init(scheduler: PriorityOrderWorkItemScheduler())

Check warning on line 855 in WireDomain/Sources/WireDomain/Components/ClientSessionComponent.swift

View workflow job for this annotation

GitHub Actions / Test Results

Actor-isolated default value in a nonisolated context; this is an error in the Swift 6 language mode

Actor-isolated default value in a nonisolated context; this is an error in the Swift 6 language mode
public lazy var conversationUpdatesGenerator: IncrementalGeneratorProtocol = ConversationUpdatesGenerator(
repository: conversationRepository,
context: syncContext,
Expand All @@ -853,7 +861,7 @@
self?.workAgent.submitItem(workItem)
}
)

Check warning on line 864 in WireDomain/Sources/WireDomain/Components/ClientSessionComponent.swift

View workflow job for this annotation

GitHub Actions / Test Results

Actor-isolated default value in a nonisolated context; this is an error in the Swift 6 language mode

Actor-isolated default value in a nonisolated context; this is an error in the Swift 6 language mode
public lazy var commitPendingProposalsGenerator: LiveGeneratorProtocol = CommitPendingProposalsGenerator(
repository: conversationRepository,
mlsService: mlsService,
Expand All @@ -866,7 +874,7 @@
self?.workAgent.submitItem(workItem)
}
)

Check warning on line 877 in WireDomain/Sources/WireDomain/Components/ClientSessionComponent.swift

View workflow job for this annotation

GitHub Actions / Test Results

Actor-isolated default value in a nonisolated context; this is an error in the Swift 6 language mode

Actor-isolated default value in a nonisolated context; this is an error in the Swift 6 language mode
public lazy var invalidMLSGroupGenerator: IncrementalGeneratorProtocol = InvalidMLSGroupGenerator(
context: syncContext,
mlsService: mlsService,
Expand Down Expand Up @@ -901,7 +909,7 @@
liveBrokenGroupSubject.sink { [weak self] liveMLSBrokenGroups in
guard let self else { return }
WireLogger.mls.debug("detected during live sync \(liveMLSBrokenGroups.count) broken MLS groups")
let item = RepairBrokenMLSGroupsItem(repairAgent: mlsGroupRepairAgent)

Check warning on line 912 in WireDomain/Sources/WireDomain/Components/ClientSessionComponent.swift

View workflow job for this annotation

GitHub Actions / Test Results

Passing closure as a 'sending' parameter risks causing data races between code in the current task and concurrent execution of the closure; this is an error in the Swift 6 language mode

Passing closure as a 'sending' parameter risks causing data races between code in the current task and concurrent execution of the closure; this is an error in the Swift 6 language mode
Task {
await self.workAgent.submitItem(item)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
private let journal: Journal
private let mlsGroupRepairAgent: MLSGroupRepairAgentProtocol
private let earService: EARServiceInterface
private let teamMemberDiscoveryAgent: any TeamMemberDiscoveryAgentProtocol

public init(
selfClientID: String,
Expand All @@ -59,7 +60,8 @@
liveBrokenGroupSubject: PassthroughSubject<Set<String>, Never>,
journal: Journal,
mlsGroupRepairAgent: MLSGroupRepairAgentProtocol,
earService: EARServiceInterface
earService: EARServiceInterface,
teamMemberDiscoveryAgent: any TeamMemberDiscoveryAgentProtocol
) {
self.selfClientID = selfClientID
self.pushChannelAPI = pushChannelAPI
Expand All @@ -74,6 +76,7 @@
self.journal = journal
self.mlsGroupRepairAgent = mlsGroupRepairAgent
self.earService = earService
self.teamMemberDiscoveryAgent = teamMemberDiscoveryAgent
}

public func perform() async throws -> Token {
Expand Down Expand Up @@ -161,14 +164,18 @@

await mlsGroupRepairAgent.repairConversations()

if !backgroundAccessibleOnly {
await teamMemberDiscoveryAgent.discoverMembers()
}

let liveEventTask = Task { @Sendable [self] in

Check warning on line 171 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift

View workflow job for this annotation

GitHub Actions / Test Results

Capture of 'self' with non-Sendable type 'IncrementalSync' in a '@Sendable' closure; this is an error in the Swift 6 language mode

Capture of 'self' with non-Sendable type 'IncrementalSync' in a '@sendable' closure; this is an error in the Swift 6 language mode
logger.info("handling live event stream", attributes: .incrementalSyncV2, .safePublic)
syncStateSubject.send(.liveSyncing(.ongoing))

do {
// because we might be interrupted when in background, we wrap the sync in an expiringActivity that
// will cancel the task - not keeping any db operation (sqlite file opened) in suspend mode
try await withExpiringActivity(reason: "processLiveStream IncrementalSync") {

Check warning on line 178 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift

View workflow job for this annotation

GitHub Actions / Test Results

Capture of 'self' with non-Sendable type 'IncrementalSync' in an isolated closure; this is an error in the Swift 6 language mode

Capture of 'self' with non-Sendable type 'IncrementalSync' in an isolated closure; this is an error in the Swift 6 language mode
await processLiveEvents(
liveEventStream: liveEventStream,
processedEnvelopeIDs: processedEnvelopeIDs,
Expand Down Expand Up @@ -349,7 +356,7 @@
privateKeys: privateKeys,
backgroundAccessibleOnly: backgroundAccessibleOnly
)
let envelopes = envelopesWithObjectIDs.map(\.envelope)

Check warning on line 359 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift

View workflow job for this annotation

GitHub Actions / Test Results

Initialization of immutable value 'envelopesObjectIDs' was never used; consider replacing with assignment to '_' or removing it

Initialization of immutable value 'envelopesObjectIDs' was never used; consider replacing with assignment to '_' or removing it
let envelopesObjectIDs = envelopesWithObjectIDs.map(\.objectID)

guard !envelopes.isEmpty else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import Foundation
import WireCoreCrypto
import WireDataModel
import WireLogging

Check warning on line 23 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSyncV2.swift

View workflow job for this annotation

GitHub Actions / Test Results

Add '@preconcurrency' to suppress 'Sendable'-related warnings from module 'WireNetwork'

Add '@preconcurrency' to suppress 'Sendable'-related warnings from module 'WireNetwork'
import WireNetwork

public typealias CreatePushChannelStateClosure = () -> PushChannelStateProtocol
Expand Down Expand Up @@ -52,6 +52,7 @@
private let createPushChannelState: CreatePushChannelStateClosure
private let mlsGroupRepairAgent: MLSGroupRepairAgentProtocol
private let earService: EARServiceInterface
private let teamMemberDiscoveryAgent: any TeamMemberDiscoveryAgentProtocol

weak var delegate: (any LiveSyncDelegate)?

Expand All @@ -70,6 +71,7 @@
journal: Journal,
mlsGroupRepairAgent: MLSGroupRepairAgentProtocol,
earService: EARServiceInterface,
teamMemberDiscoveryAgent: any TeamMemberDiscoveryAgentProtocol,
createPushChannelState: @escaping CreatePushChannelStateClosure,
syncMarkerGenerator: @escaping SyncMarkerGenerator = { UUID().uuidString }
) {
Expand All @@ -87,6 +89,7 @@
self.journal = journal
self.mlsGroupRepairAgent = mlsGroupRepairAgent
self.earService = earService
self.teamMemberDiscoveryAgent = teamMemberDiscoveryAgent
self.syncMarkerGenerator = syncMarkerGenerator
self.createPushChannelState = createPushChannelState
}
Expand Down Expand Up @@ -152,6 +155,8 @@

await mlsGroupRepairAgent.repairConversations()

await teamMemberDiscoveryAgent.discoverMembers()

let task = Task { @Sendable [self] in
logger.debug("handling live event stream", attributes: logAttributes)
syncStateSubject.send(.liveSyncing(.ongoing))
Expand All @@ -160,14 +165,14 @@
// because we might be interrupted when in background, we wrap the sync in an expiringActivity that will
// cancel the task (not keeping any file lock in suspend mode)
try await withExpiringActivity(reason: "processLiveStream IncrementalSyncV2") {
await processLiveStream(

Check warning on line 168 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSyncV2.swift

View workflow job for this annotation

GitHub Actions / Test Results

Capture of 'liveEventStream' with non-Sendable type 'PushChannelV2.Stream' (aka 'AsyncThrowingStream<PushChannelV2.Element, any Error>') in a '@Sendable' closure; this is an error in the Swift 6 language mode

Capture of 'liveEventStream' with non-Sendable type 'PushChannelV2.Stream' (aka 'AsyncThrowingStream<PushChannelV2.Element, any Error>') in a '@sendable' closure; this is an error in the Swift 6 language mode

Check warning on line 168 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSyncV2.swift

View workflow job for this annotation

GitHub Actions / Test Results

Capture of 'liveEventStream' with non-Sendable type 'PushChannelV2.Stream' (aka 'AsyncThrowingStream<PushChannelV2.Element, any Error>') in an isolated closure; this is an error in the Swift 6 language mode

Capture of 'liveEventStream' with non-Sendable type 'PushChannelV2.Stream' (aka 'AsyncThrowingStream<PushChannelV2.Element, any Error>') in an isolated closure; this is an error in the Swift 6 language mode
liveEventStream,
pushChannel: pushChannel,
syncMarker: syncMarker
)

WireLogger.sync.debug("Live stream ended, close push channel", attributes: logAttributes)
await pushChannel.close()

Check warning on line 175 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSyncV2.swift

View workflow job for this annotation

GitHub Actions / Test Results

Capture of 'pushChannelState' with non-Sendable type 'any PushChannelStateProtocol' in a '@Sendable' closure; this is an error in the Swift 6 language mode

Capture of 'pushChannelState' with non-Sendable type 'any PushChannelStateProtocol' in a '@sendable' closure; this is an error in the Swift 6 language mode

Check warning on line 175 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSyncV2.swift

View workflow job for this annotation

GitHub Actions / Test Results

Capture of 'pushChannelState' with non-Sendable type 'any PushChannelStateProtocol' in an isolated closure; this is an error in the Swift 6 language mode

Capture of 'pushChannelState' with non-Sendable type 'any PushChannelStateProtocol' in an isolated closure; this is an error in the Swift 6 language mode
await pushChannelState.markAsClosed()
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Wire
// Copyright (C) 2026 Wire Swiss GmbH
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see http://www.gnu.org/licenses/.
//

import Foundation

// sourcery: AutoMockable
/// Discovers self-team members by replaying recent team notifications,
/// working around the 2000-member cap on the legacy bulk team-members endpoint.
public protocol TeamMemberDiscoveryAgentProtocol {

/// Fetch recent team notifications and apply any discovered
/// `team.member-join` events to local storage.
func discoverMembers() async

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
//
// Wire
// Copyright (C) 2026 Wire Swiss GmbH
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see http://www.gnu.org/licenses/.
//

import Foundation
import WireLogging
import WireNetwork

struct TeamMemberDiscoveryAgent: TeamMemberDiscoveryAgentProtocol {

private let api: any TeamsAPI
private let store: any TeamLocalStoreProtocol
private let journal: Journal

// MARK: - Life cycle

init(
api: any TeamsAPI,
store: any TeamLocalStoreProtocol,
journal: Journal
) {
self.api = api
self.store = store
self.journal = journal
}

// MARK: - TeamMemberDiscoveryAgentProtocol

func discoverMembers() async {
guard let selfTeamID = await store.selfTeamID() else {
WireLogger.sync.debug("team member discovery: self user is not in a team")
return
}

do {
try await runDiscovery(selfTeamID: selfTeamID)
} catch TeamsAPIError.missedEvents {
WireLogger.sync.warn(
"team member discovery: server lost cursor history, resetting and re-running",
attributes: .safePublic
)
journal[.lastTeamNotificationID] = nil
do {
try await runDiscovery(selfTeamID: selfTeamID)
} catch {
WireLogger.sync.error(
"team member discovery failed after cursor reset: \(String(describing: error))"
)
}
} catch {
WireLogger.sync.error("team member discovery failed: \(String(describing: error))")
}
}
Comment thread
caldrian marked this conversation as resolved.

private func runDiscovery(selfTeamID: UUID) async throws {
let sinceNotificationID = journal[.lastTeamNotificationID].flatMap(UUID.init(uuidString:))
let pager = try api.getNotifications(sinceNotificationID: sinceNotificationID)

var discoveredCount = 0
for try await notifications in pager {
let teamMembersInfo = notifications.map { notification -> TeamMemberInfo in
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit-pick(subjective): Consider renaming teamMemberInfos to make it clear that this is an collection

switch notification.kind {
case let .memberJoin(event):
TeamMemberInfo(
id: event.userID,
selfPermission: nil,
creatorID: nil,
creationDate: event.time
)
}
}

guard !teamMembersInfo.isEmpty else { continue }

try await store.storeTeamMembers(
selfTeamID: selfTeamID,
teamMembersInfo: teamMembersInfo
)
discoveredCount += teamMembersInfo.count

// Persist forward progress per page so an interruption
// doesn't force a full re-walk on the next run.
if let lastID = notifications.last?.id {
journal[.lastTeamNotificationID] = lastID.uuidString
}
}

WireLogger.sync.debug("team member discovery: stored \(discoveredCount) member(s)")
}

}
12 changes: 12 additions & 0 deletions WireDomain/Sources/WireDomain/Utilities/Journal/JournalKey.swift
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,18 @@ public extension JournalKey where Value == Bool {

}

public extension JournalKey where Value == String? {

/// The id of the last team notification processed by
/// `TeamMemberDiscoveryAgent`, used as a cursor on subsequent runs.

static let lastTeamNotificationID = Self(
"lastTeamNotificationID",
defaultValue: nil
)

}

public extension JournalKey where Value == Set<String> {

/// The set of MLS group IDs to be repaired.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import CoreData
import GenericMessageProtocol
import XCTest

@testable import WireDataModel
@testable import WireDataModelSupport

Check warning on line 25 in WireDomain/Tests/WireDomainTests/Synchronization/IncrementalSyncTests.swift

View workflow job for this annotation

GitHub Actions / Test Results

Add '@preconcurrency' to suppress 'Sendable'-related warnings from module 'WireDomain'

Add '@preconcurrency' to suppress 'Sendable'-related warnings from module 'WireDomain'
@testable import WireDomain
@testable import WireDomainSupport
@testable import WireNetwork
Expand All @@ -45,6 +46,7 @@
var mlsGroupRepairAgent: MockMLSGroupRepairAgentProtocol!
var cancellables: Set<AnyCancellable>!
var earService: MockEARServiceInterface!
private var teamMemberDiscoveryAgentMock: MockTeamMemberDiscoveryAgentProtocol!
fileprivate var notificationContext: MockNotificationContext!

override func setUp() {
Expand All @@ -64,6 +66,8 @@
mlsGroupRepairAgent = MockMLSGroupRepairAgentProtocol()
cancellables = Set<AnyCancellable>()
earService = MockEARServiceInterface()
teamMemberDiscoveryAgentMock = MockTeamMemberDiscoveryAgentProtocol()
teamMemberDiscoveryAgentMock.discoverMembers_MockMethod = {}

Check failure on line 70 in WireDomain/Tests/WireDomainTests/Synchronization/IncrementalSyncTests.swift

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this closure is empty, or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=wireapp_wire-ios&issues=AZ4_is-HNHGdey943c1N&open=AZ4_is-HNHGdey943c1N&pullRequest=4720
notificationContext = MockNotificationContext()

earService.underlyingIsLocked = false
Expand All @@ -83,12 +87,14 @@
liveBrokenGroupSubject: liveBrokenGroupSubject,
journal: journal,
mlsGroupRepairAgent: mlsGroupRepairAgent,
earService: earService
earService: earService,
teamMemberDiscoveryAgent: teamMemberDiscoveryAgentMock
)
Comment thread
caldrian marked this conversation as resolved.
}

override func tearDown() {
sut = nil
teamMemberDiscoveryAgentMock = nil
journal = nil
pushChannelAPI = nil
updateEventsSync = nil
Expand Down Expand Up @@ -309,7 +315,7 @@
}.store(in: &cancellables)

let token = try await sut.perform()
await token.task.value

Check warning on line 318 in WireDomain/Tests/WireDomainTests/Synchronization/IncrementalSyncTests.swift

View workflow job for this annotation

GitHub Actions / Test Results

Instance method 'wait' is unavailable from asynchronous contexts; Use await fulfillment(of:timeout:enforceOrder:) instead; this is an error in the Swift 6 language mode

Instance method 'wait' is unavailable from asynchronous contexts; Use await fulfillment(of:timeout:enforceOrder:) instead; this is an error in the Swift 6 language mode
wait(for: [expectation], timeout: 5)

// Then live events were decrypted (duplicates skipped).
Expand Down Expand Up @@ -387,13 +393,13 @@
// Repair broken MLS conversations
mlsGroupRepairAgent.repairConversations_MockMethod = {}

// When

Check warning on line 396 in WireDomain/Tests/WireDomainTests/Synchronization/IncrementalSyncTests.swift

View workflow job for this annotation

GitHub Actions / Test Results

Type 'IncrementalSync.Token' does not conform to the 'Sendable' protocol; this is an error in the Swift 6 language mode

Type 'IncrementalSync.Token' does not conform to the 'Sendable' protocol; this is an error in the Swift 6 language mode

Check warning on line 396 in WireDomain/Tests/WireDomainTests/Synchronization/IncrementalSyncTests.swift

View workflow job for this annotation

GitHub Actions / Test Results

Type 'IncrementalSync.Token' does not conform to the 'Sendable' protocol; this is an error in the Swift 6 language mode

Type 'IncrementalSync.Token' does not conform to the 'Sendable' protocol; this is an error in the Swift 6 language mode

Check warning on line 396 in WireDomain/Tests/WireDomainTests/Synchronization/IncrementalSyncTests.swift

View workflow job for this annotation

GitHub Actions / Test Results

Passing closure as a 'sending' parameter risks causing data races between code in the current task and concurrent execution of the closure; this is an error in the Swift 6 language mode

Passing closure as a 'sending' parameter risks causing data races between code in the current task and concurrent execution of the closure; this is an error in the Swift 6 language mode
let task = Task {
try await sut.perform()
}

// Then
do {

Check warning on line 402 in WireDomain/Tests/WireDomainTests/Synchronization/IncrementalSyncTests.swift

View workflow job for this annotation

GitHub Actions / Test Results

Type 'IncrementalSync.Token' does not conform to the 'Sendable' protocol; this is an error in the Swift 6 language mode

Type 'IncrementalSync.Token' does not conform to the 'Sendable' protocol; this is an error in the Swift 6 language mode
_ = try await task.value
} catch {
XCTAssertEqual(pushChannel.close_Invocations.count, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import Combine
import CoreData
import XCTest

@testable import WireDataModel
@testable import WireDataModelSupport
@testable import WireDomain
Expand Down Expand Up @@ -47,6 +48,7 @@
var journal: Journal!
var cancellables: Set<AnyCancellable>!
var earService: MockEARServiceInterface!
private var teamMemberDiscoveryAgentMock: MockTeamMemberDiscoveryAgentProtocol!

override func setUp() {
pushChannelAPI = MockPushChannelV2API()
Expand Down Expand Up @@ -75,6 +77,8 @@
liveBrokenGroupSubject = .init()
cancellables = .init()
earService = MockEARServiceInterface()
teamMemberDiscoveryAgentMock = MockTeamMemberDiscoveryAgentProtocol()
teamMemberDiscoveryAgentMock.discoverMembers_MockMethod = {}

Check failure on line 81 in WireDomain/Tests/WireDomainTests/Synchronization/IncrementalSyncV2Tests.swift

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this closure is empty, or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=wireapp_wire-ios&issues=AZ4_is4INHGdey943c1M&open=AZ4_is4INHGdey943c1M&pullRequest=4720

sut = IncrementalSyncV2(
selfClientID: Scaffolding.selfClientID,
Expand All @@ -91,6 +95,7 @@
journal: journal,
mlsGroupRepairAgent: mlsGroupRepairAgent,
earService: earService,
teamMemberDiscoveryAgent: teamMemberDiscoveryAgentMock,
createPushChannelState: {
self.pushChannelState
},
Expand All @@ -110,6 +115,7 @@

override func tearDown() {
sut = nil
teamMemberDiscoveryAgentMock = nil
pushChannelAPI = nil
mlsGroupRepairAgent = nil
pullServerTimeSync = nil
Expand Down
Loading
Loading